# -*- coding: utf-8 -*-
"""
@Time: 2022/7/21 13:19
@Author: jins
@File: 向topic批量灌数据模拟入库.py
@Introduction: 
"""
import json
import threading
from datetime import datetime

from kafka import KafkaProducer

lis = [i for i in range(202200000, 202200000 + 3000) if i % 50 == 0]


def on_send_success(*args, **kwargs):
    return args


def on_send_error(*args, **kwargs):
    return args


def send_messages_to_topic(topic, i):
    for device_code in range(i, i + 50):
        message = [
            {
                "dataSource": 1,
                "responseDeviceIdsMap": {},
                "responseId": f"{device_code}",
                "responseResults": [
                    {
                        "businessCode": "device_electric_ts",
                        "businessType": "device_electric",
                        "deviceResults": [
                            {
                                "childDeviceCode": "1",
                                "childDeviceId": str(device_code * 10 + 1),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.9,
                                        "pc_actual_kw": -2.6985,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.383,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": 99.9,
                                        "pa_actual_kw": 2.7881,
                                        "indfat_origin_kwh": 0.66,
                                        "pfb_origin_empty": 99.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8392,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6996,
                                        "iab_origin_d": 240.0,
                                        "pfa_origin_empty": 99.9,
                                        "pft_origin_empty": 99.9,
                                        "pt_actual_kw": 2.8263,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 0.0,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.4,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 0.0,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 0.0,
                                        "qt_actual_kvar": 1.9046,
                                        "efat_actual_kwh": 0.6,
                                        "ub_origin_v": 219.0,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 2.7369,
                                        "iac_origin_d": 120.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "2",
                                "childDeviceId": str(device_code * 10 + 2),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6885,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.384,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": 99.9,
                                        "pa_actual_kw": 3.7881,
                                        "indfat_origin_kwh": 0.66,
                                        "pfb_origin_empty": 99.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8492,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6896,
                                        "iab_origin_d": 240.0,
                                        "pfa_origin_empty": 99.9,
                                        "pft_origin_empty": 99.9,
                                        "pt_actual_kw": 3.8264,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 0.0,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 0.0,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 0.0,
                                        "qt_actual_kvar": 1.8046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 3.7369,
                                        "iac_origin_d": 120.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "3",
                                "childDeviceId": str(device_code * 10 + 3),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.9,
                                        "pc_actual_kw": -2.6785,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.385,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": 99.9,
                                        "pa_actual_kw": 4.7881,
                                        "indfat_origin_kwh": 50762.76,
                                        "pfb_origin_empty": 99.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8592,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6796,
                                        "iab_origin_d": 240.0,
                                        "pfa_origin_empty": 99.9,
                                        "pft_origin_empty": 99.9,
                                        "pt_actual_kw": 4.8265,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 50446.93,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.4,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 50446.93,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 0.0,
                                        "qt_actual_kvar": 1.7046,
                                        "efat_actual_kwh": 0.8,
                                        "ub_origin_v": 219.0,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 4.7369,
                                        "iac_origin_d": 120.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "4",
                                "childDeviceId": str(device_code * 10 + 4),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10341.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "5",
                                "childDeviceId": str(device_code * 10 + 5),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10341.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "6",
                                "childDeviceId": str(device_code * 10 + 6),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10341.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "7",
                                "childDeviceId": str(device_code * 10 + 7),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10341.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "8",
                                "childDeviceId": str(device_code * 10 + 8),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10341.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "9",
                                "childDeviceId": str(device_code * 10 + 9),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10341.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            },
                            {
                                "childDeviceCode": "10",
                                "childDeviceId": str(device_code * 10 + 10),
                                "code": 20000,
                                "data": [
                                    {
                                        "ua_origin_v": 218.3,
                                        "pc_actual_kw": -2.6685,
                                        "erat_actual_kwh": 0.0,
                                        "ic_origin_a": -14.386,
                                        "indrrt_origin_kvarh": 0.0,
                                        "efrt_actual_kvarh": 0.46,
                                        "real_time": 1635498900000,
                                        "pfc_origin_empty": -85.5,
                                        "pa_actual_kw": 5.7881,
                                        "indfat_origin_kwh": 11231.66,
                                        "pfb_origin_empty": 82.9,
                                        "data_time": 1635498900000,
                                        "qb_actual_kvar": 1.8692,
                                        "indrat_origin_kwh": 0.0,
                                        "uab_origin_d": 240.0,
                                        "qa_actual_kvar": 1.6696,
                                        "iab_origin_d": 208.1,
                                        "pfa_origin_empty": 85.3,
                                        "pft_origin_empty": 82.9,
                                        "pt_actual_kw": 5.8266,
                                        "uac_origin_d": 120.0,
                                        "indrp1_origin_kvarh": 10342.52,
                                        "indrp3_origin_kvarh": 0.0,
                                        "indrp4_origin_kvarh": 0.0,
                                        "uc_origin_v": 219.5,
                                        "indrp2_origin_kvarh": 0.0,
                                        "ib_origin_a": 15.043,
                                        "indfrt_origin_kvarh": 10341.52,
                                        "errt_actual_kvarh": 0.0,
                                        "qc_actual_kvar": -1.6343,
                                        "iaa_origin_d": 331.4,
                                        "qt_actual_kvar": 1.6046,
                                        "efat_actual_kwh": 0.7,
                                        "ub_origin_v": 218.9,
                                        "uaa_origin_d": 0.0,
                                        "ia_origin_a": 15.132,
                                        "pb_actual_kw": 5.7369,
                                        "iac_origin_d": 269.0
                                    }
                                ],
                                "deviceCode": f"{device_code}",
                                "deviceId": f"{device_code}",
                                "frequency": "mi15",
                                "regionCode": "1",
                                "success": True,
                                "sysType": [
                                    "es"
                                ]
                            }
                        ],
                        "schemaCode": "QKL8163_2_2018",
                        "serviceIdentifier": "0DH"
                    }
                ],
                "responseTime": "2022-07-21 22:22:22",
                "responseType": 2,
                "sysCode": "tcp",
                "time": 22
            }
        ]
        try:
            if isinstance(message, dict) or isinstance(message, list):
                send_message = json.dumps(message).encode()
            elif isinstance(message, str):
                send_message = message.encode()
            # p.send(topic, send_message).add_callback(on_send_success).add_errback(on_send_error)
            f = p.send(topic, send_message)
            f.get(10)
        except Exception as e:
            print(e)


if __name__ == '__main__':
    p = KafkaProducer(bootstrap_servers=['192.168.11.85:9091', '192.168.11.85:9092', '192.168.11.85:9093'],
                      api_version=(0, 11), buffer_memory=33554432 * 1000, retries=5)
    # p = KafkaProducer(bootstrap_servers=['10.0.17.17:9001', '10.0.17.17:9002', '10.0.17.17:9003'],
    #                   api_version=(0, 11), buffer_memory=33554432 * 1000, acks=0)
    time_start = datetime.now()
    print(time_start)
    print(lis)
    thread_list = []
    for i in lis:
        t = threading.Thread(target=send_messages_to_topic, args=('COMMUNICATION_RESPONSE_DATA', i,))
        t.start()
        thread_list.append(t)
    print('线程启动完成')
    for i in thread_list:
        i.join()
    time_end = datetime.now()
    p.flush()
    p.close()
    print(time_end-time_start)
